Spark session

SparkContext is the main entry point to spark core. It allows us to access further functionalities of spark. This helps to establish a connection to spark execution environment. It provides access to spark cluster even with a resource manager. Sparkcontext act as master of spark application.

It offers various function Such as:
  • Getting the current status of spark application
  • Canceling the job
  • Canceling the Stage
  • Running job synchronously
  • Running job asynchronously
  • Persistent RDD
  • Un-persisting RDD
  • Programmable dynamic allocation
A Spark “driver” is an application that creates a SparkContext for executing one or more jobs in the Spark cluster. It allows your Spark Application to access Spark Cluster with the help of Resource Manager. Spark session is a unified entry point of a spark application from Spark 2.0. It provides a way to interact with various spark’s functionality with a lesser number of constructs.  All functionality available with SparkContext are also available in SparkSession. Also, it provides APIs to work on DataFrames and Datasets. Instead of having a spark context, hive context, SQL context, now all of it is encapsulated in a Spark session.

Spark Session also includes all the APIs available in different contexts-
  • Spark Context,
  • SQL Context,
  • Streaming Context,
  • Hive Context.
Create Spark Session
In previous versions of Spark, you had to create a SparkConf and SparkContext to interact with Spark, as shown here:
 //set up the spark configuration and create contexts

val sparkConf = new SparkConf().setAppName("SparkSessionExample").setMaster("local")
// your handle to SparkContext to access other context like SQLContext
val sc = new SparkContext(sparkConf).set("spark.some.config.option", "some-value")
val sqlContext = new org.apache.spark.sql.SQLContext(sc) 

Whereas in Spark 2.0 the same effects can be achieved through SparkSession, without expliciting creating SparkConf, SparkContext or SQLContext, as they’re encapsulated within the SparkSession.Using a builder design pattern, it instantiates a SparkSession object if one does not already exist, along with its associated underlying contexts. The entry point into all functionality in Spark is the SparkSession class. 
 
import org.apache.spark.sql.SparkSession
val spark = SparkSession  .builder()
  .master("local[4]")
  .appName("Spark SQL basic example")
  .config("spark.some.config.option", "some-value")

  .config("spark.sql.warehouse.dir", warehouseLocation)
  .enableHiveSupport()
  .getOrCreate()

 
The spark session builder will try to get a spark session if there is one already created or create a new one and assigns the newly created SparkSession as the global default. enableHiveSupport here is similar to creating a HiveContext and all it does is enables access to Hive metastore, Hive serdes, and Hive udfs. We can access spark context and other context using the spark session object.
  • spark.sparkContext
  • spark.sqlContext
Spark Session in pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder\
.master("local[*]")\
.appName('PySpark_Tutorial')\
.getOrCreate()

# where the '*' represents all the cores of the CPU.
Spark Session for MongoDB
import com.mongodb.spark._ import org.apache.spark.sql.SparkSession val spark = SparkSession.builder()
.master("local")
.appName("MongoSparkConnectorIntro")
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1/test.myCollection")
.config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.myCollection")
.getOrCreate()
Why do I need Spark Session when I already have Spark Context?
As we know, in previous versions, (before 2.0) spark context is the entry point for spark, As RDD was the main API, it was created and manipulated using context API’s. For every other API, we needed to use a different context. For streaming we needed streamingContext. For SQL sqlContext and for hive hiveContext.But as dataSet and DataFrame API’s are becoming new standalone API’s we need an entry-point build for them. So in spark 2.0, we have a new entry point build for DataSet and DataFrame API’s called as Spark-Session. Its a combination of SQLContext, HiveContext and future streamingContext. All the API’s available on those contexts are available on SparkSession also SparkSession has a spark context for actual computation.its avoids the developer to worry about creating difference contexts. But apart from this big advantage, the developers of spark have tried to solve the problem when there are multiple users using the same spark context.Let’s say we have multiple users accessing the same notebook environment which had shared spark context and the requirement was to have an isolated environment sharing the same spark context. Prior to 2.0, the solution to this was to create multiple spark contexts ie spark context per isolated environment or users and is an expensive operation(generally 1 per JVM). But with the introduction of the spark session, this issue has been addressed.

How do create multiple Spark Session?
Spark gives a straight forward API to create a new session which shares the same spark context spark.newSession() creates a new spark session object. If we look closely at the hash of the spark and session2 , they both are different. In contrast, the underneath spark context is the same.

val session1 = spark

val session2 = spark.newSession()


session1.sparkContext

session2.sparkContext


Also, we can verify that the spark session gives a unified view of all the contexts and isolation of configuration and environment. We can directly query without creating a SQL Context like we used and run the queries similarly. Let’s say we have a table called people_session1 .This table will be only visible in the session spark . Let's say we created a new session session2 .These tables won’t be visible for when we try to access them and also we can create another table with the same name without affecting the table in spark session.

empDF.createOrReplaceTempView("session1")
spark.sql("show tables").show()


spark.catalog.listTables.show()


session2.sql("show tables").show()

session2.catalog.listTables.show()

This isolation is for the configurations as well. Both sessions can have their own configs.
spark.conf.get("spark.sql.crossJoin.enabled")
res21: String = true

session2.conf.get("spark.sql.crossJoin.enabled")
res25: String = false

Below is an example on how to create a SparkSession using builder pattern method and SparkContext. SparkContext will be created only once for an application; even if you try to create another SparkContext, it still return existing SparkContext.

Example
package com.bigdata.spark
import org.apache.spark.sql.SparkSession

object SparkSessionTest {
  def main(args:Array[String]): Unit ={
    val sparkSession1 = SparkSession.builder()
      .master("local[1]")
      .appName("Sparktutorial1")
      .getOrCreate();

    println("First SparkContext:")
    println("APP Name :"+sparkSession1.sparkContext.appName);
    println("Deploy Mode :"+sparkSession1.sparkContext.deployMode);
    println("Master :"+sparkSession1.sparkContext.master);

    val sparkSession2 = SparkSession.builder()
      .master("local[1]")
      .appName("SparkTutorial2")
      .getOrCreate();

    println("Second SparkContext:")
    println("APP Name :"+sparkSession2.sparkContext.appName);
    println("Deploy Mode :"+sparkSession2.sparkContext.deployMode);
    println("Master :"+sparkSession2.sparkContext.master);
  }
}

OutPut
First SparkContext:
APP Name :Sparktutorial1
Deploy Mode :client
Master :local[1]

Second SparkContext:
APP Name :Sparktutorial1
Deploy Mode :client
Master :local[1]

No comments:

Post a Comment